In this notebok, I'll generate a time series for testing the Cobalt beamformed pipeline. The output will be an ascii file containing the data for a single sub band, one sample per line, with four integers representing thre real and imaginary parts of the x signal, and the real and imaginary parts of the y signal, respectively.
In [1]:
%pylab inline
def float_to_int(time_series_x, time_series_y):
scale = 127/max(abs(time_series_x).max(), abs(time_series_y).max())
x_re = rint(scale*time_series_x.real)
x_im = rint(scale*time_series_x.imag)
y_re = rint(scale*time_series_y.real)
y_im = rint(scale*time_series_y.imag)
return x_re, x_im, y_re, y_im
def write_timeseries(time_series_x, time_series_y, file_name):
x_re, x_im, y_re, y_im = float_to_int(time_series_x, time_series_y)
with open(file_name, mode='w') as output:
for xre, xim, yre, yim in zip(x_re, x_im, y_re, y_im):
output.write('%4d %4d %4d %4d\n' %
(int(xre), int(xim), int(yre), int(yim)))
We generate the timeseries by beginning with random Gaussian noise for the real and imaginary parts, to obtain a flat spectral response. The resulting timeseries is subsequently multiplied with the amplitude of the pulsar signal as a function of time.
In [2]:
def complex_noise(num_samples, sigma=1.0):
r'''
Return numpy.ndarray of complex valued random noise with
sigma(real) == sigma(imag) == sigma.
**Parameters**
num_samples : integer
Total number of complex samples to return.
sigma : float
The standard deviation of the real and imaginary parts
>>> len(complex_noise(10))
10
'''
return random.normal(loc=0.0, scale=sigma, size=num_samples) + 1.0j*random.normal(loc=0.0, scale=sigma, size=num_samples)
We also need to define the parameters of the simulation, and generate the white noise timeseries:
In [3]:
time_resolution_s = 512./100e6
duration_s = 20.0
num_samples = int(ceil(duration_s/time_resolution_s))
time = arange(num_samples)*time_resolution_s
white_noise_x = complex_noise(num_samples)
white_noise_y = complex_noise(num_samples)
In [4]:
write_timeseries(white_noise_x, white_noise_y, 'synthetic-white-noise-1SB.txt')
In [5]:
figure(figsize=(16,4.5))
x_re, x_im, y_re, y_im = float_to_int(white_noise_x, white_noise_y)
wn_spec = fft.fft(x_re**2 +x_im**2 + y_re**2 +y_im**2)
wn_spec[0] = 0
plot(abs(wn_spec[0:2000]))
Out[5]:
We now define the pulsar as a python function that provides the amplitude as a function of time.
In [6]:
def pulsar_amplitude(time_s, period_s, fraction_on):
r'''
Compute the amplitude of the "pulsar" signal as a function of time.
**Parameters**
time_s : array of float
The time in seconds at which to evaluate the amplitude.
period_s : float
The period of the block wave in s.
fraction_on: float
Fraction of the period that the pulse must be "on". It will have
zero amplitude in the rest of the period.
**Example**
>>> sorry, nothing yet.
'''
time_in_periods = time_s/period_s
amplitudes = 1.0*(time_in_periods - floor(time_in_periods) < fraction_on)
return amplitudes
In [7]:
figure(figsize=(16, 4.5))
plot(time[0:200000], pulsar_amplitude(time[0:200000], 0.1, 0.1), linewidth=3)
xlabel('Time [s]')
ylabel('Amplitude [arbitrary]')
title('Pulsar amplitude with period 100 ms and duty-cycle of 10%')
Out[7]:
In [8]:
psr_amp = pulsar_amplitude(time, period_s=0.1, fraction_on=0.1)
signal_x = white_noise_x*psr_amp
signal_y = white_noise_y*psr_amp
selection = 50000
figure(figsize=(16, 4.5))
plot(time[:selection], real(signal_x[:selection]), label='Real [x]', color='blue', linewidth=3, alpha=0.5)
plot(time[:selection], real(signal_y[:selection]), label='Real [y]', color='red', linewidth=3, alpha=0.5)
legend()
xlabel('Time [s]')
ylabel('Voltage [arbitrary]')
title('Simulated noiseless pulsar time series, one subband, no dispersion, critically sampled')
Out[8]:
In [9]:
write_timeseries(signal_x, signal_y, 'synthetic_block_psr_100ms_no_noise.txt')
In [10]:
figure(figsize=(16,9))
dynspec_x = array([fft.fftshift(fft.fft(spec)) for spec in (signal_x[:64*int(floor(len(signal_x)/64.))].reshape((-1, 64)))[:800,:]])
imshow(abs(dynspec_x).T, origin='lower', interpolation='nearest')
colorbar()
axis('tight')
xlabel('time slot')
ylabel('channel')
Out[10]:
We now need a specific frequency spectrum. A FIR filter with a Gaussian frequency response would enable easy fitting for the exact frequency of the peak. The (un-normalized) functional form of the frequency response that we require is
\begin{equation} f(\nu) = \mathrm{e}^{-\frac{(\nu - \nu_\mathrm{peak})^2}{2\sigma_\nu^2}} \end{equation}Given that LOFAR uses a Fourier transform with a minus sign in the exponent for the time-to-frequency transform, we have to use the transform with a plus sign in the exponent to transform this response to the time domain. The unnormalized result is
\begin{equation} w(t) = \mathrm{e}^{-2\pi^2t^2\sigma_\nu^2}\mathrm{e}^{+2\pi\mathrm{i}\nu_\mathrm{peak} t} \end{equation}
In [11]:
def raw_fir_coefficients(time_s, peak_hz, sigma_hz):
r'''
Compute
math ::
w(t) = \mathrm{e}^{-2\pi^2t^2\sigma_\nu^2}\mathrm{e}^{+2\pi\mathrm{i}\nu_\mathrm{peak} t}
'''
return exp(2.j*pi*peak_hz*time_s -2*(pi*time_s*sigma_hz)**2)
In [12]:
filter_length = 513
filter_time_s = (arange(filter_length) - floor(filter_length/2))*time_resolution_s
f_k = raw_fir_coefficients(filter_time_s, peak_hz=-14038.086, sigma_hz=12e3)
f_k /= f_k.sum()
fir_taper = kaiser(filter_length, beta=14.0)
spectral_signal_x = convolve(white_noise_x, f_k*fir_taper, mode='valid')
spectral_signal_x *= psr_amp[:len(spectral_signal_x)]
spectral_signal_y = convolve(white_noise_y, f_k*fir_taper, mode='valid')
spectral_signal_y *= psr_amp[:len(spectral_signal_y)]
In [52]:
figure(figsize=(16,9))
dynspec_x = array([fft.fftshift(fft.fft(spec*kaiser(len(spec), beta=14.))) for spec in (spectral_signal_x[:64*int(floor(len(spectral_signal_x)/64.))].reshape((-1, 64)))[:800,:]])
dynspec_y = array([fft.fftshift(fft.fft(spec*kaiser(len(spec), beta=14.))) for spec in (spectral_signal_y[:64*int(floor(len(spectral_signal_x)/64.))].reshape((-1, 64)))[:800,:]])
stokes_i = abs(dynspec_x)**2 + abs(dynspec_y)**2
stokes_i /= stokes_i.max()
imshow(log10(abs(stokes_i)).T, origin='lower', interpolation='nearest')
colorbar()
axis('tight')
xlabel('time slot')
ylabel('channel')
Out[52]:
write_timeseries(spectral_signal_x, spectral_signal_y, 'synthetic_block_psr_100ms_gauss_spectrum_no_noise.txt')
We also need to test multiple sub bands.
Now, what do the data look like?
In [61]:
tf_data_gauss_spectrum = fromfile('tab0-gauss.raw', dtype=float32).reshape((-1, 64))
tf_data_gauss_spectrum /= tf_data_gauss_spectrum.max()
In [62]:
figure(figsize=(16, 9), dpi=100)
imshow(log10(tf_data_gauss_spectrum.T[:,:800]), interpolation='nearest', origin='lower')
axis('tight')
colorbar()
Out[62]:
In one plot:
In [86]:
dynspec_time_ms = time[32::64]*1000
extent = (dynspec_time_ms[0], dynspec_time_ms[799], -0.5, 63.5)
figure(figsize=(16, 9), dpi=100)
subplot(211)
imshow(log10(abs(stokes_i)).T, origin='lower', interpolation='nearest', vmax=0.0, vmin=-10, extent=extent)
cb_input = colorbar()
cb_input.set_label(r'$^{10}$log(I)')
axis('tight')
xlabel('Time [ms]')
ylabel('channel')
title('Input data')
grid()
subplot(212)
offset=7
imshow(log10(tf_data_gauss_spectrum.T[:,offset:800+offset]), interpolation='nearest', origin='lower', vmax=0.0, vmin=-10, extent=extent)
axis('tight')
cb_output = colorbar()
cb_output.set_label(r'$^{10}$log(I)')
xlabel('Time [ms]')
ylabel('channel')
title('Cobalt output')
grid()
savefig('../cobalt-commissioning-report/cobalt-bf-dynamic-spectrum-gauss-pulsar.pdf')
In [107]:
figure(figsize=(16, 4.5), dpi=200)
input_spectrum = stokes_i.sum(axis=0)
input_spectrum /= input_spectrum.max()
output_spectrum = tf_data_gauss_spectrum.sum(axis=0)
output_spectrum /= output_spectrum.max()
#plot(input_spectrum, color='blue', linewidth=3, label='Input')
plot(output_spectrum, color='blue', linewidth=3, label='Cobalt output')
xlabel('Channel')
ylabel('Amplitude')
title('Spectral response')
legend()
grid()
xticks(arange(64), ['%d' % chan for chan in arange(64)])
xlim(20,40)
peak_channel = 32-14038.086/(100e6/512./64.)
annotate('Expected peak', (peak_channel, 0.98), xytext=(peak_channel, 0.7), arrowprops={'linewidth':1}, rotation=90, horizontalalignment='center')
savefig('../cobalt-commissioning-report/cobalt-bf-spectrum-gauss-pulsar.pdf')
In [108]:
figure(figsize=(16, 4.5), dpi=200)
input_ts = stokes_i.sum(axis=1)
input_ts /= input_ts.max()
output_ts = tf_data_gauss_spectrum.sum(axis=1)
output_ts /= output_ts.max()
plot(dynspec_time_ms[:800], input_ts[:800], color='red', linewidth=3, label='Input')
plot(dynspec_time_ms[:800] -2.5, output_ts[:800], color='blue', linewidth=3, label='Cobalt output')
xlabel('Time [ms]')
ylabel('Amplitude')
xticks(arange(200), ['%d' % chan for chan in arange(200)])
title('Temporal response')
legend()
grid()
xlim(98,112)
yscale('log')
ylim(1e-3, 1.0)
savefig('../cobalt-commissioning-report/cobalt-bf-timeseries-gauss-pulsar.pdf')
In [ ]: